package com.cloudansys.core.flink.sink;

import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.cache.CacheManager;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.FLQXEntity;
import com.cloudansys.core.entity.LiftDataEntity;
import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.*;
import com.cloudansys.hawkeye.modules.settings.model.BaseThreshold;
import com.cloudansys.hawkeye.modules.settings.model.vo.PrimaryWarn;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import redis.clients.jedis.Jedis;

import java.util.*;
import java.util.concurrent.TimeUnit;

@Slf4j
public class QJRedisSink extends RichSinkFunction<List<MultiDataEntity>> {

    private static Jedis jedis;
    private static String topic_warn;
    private static String topic_data_fl;
    private static long lastTimeMqtt = 0;
    private static MqttClient mqttClient;
    private static InfluxDB influxDB;
    private static String redisWarnsKey;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = DefaultConfig.getJedis();
        mqttClient = DefaultConfig.getMqttClient();
        topic_warn = DefaultConfig.get(Const.MQTT_TOPIC_WARN);
        topic_data_fl = DefaultConfig.get(Const.MQTT_TOPIC_DATA_FL);
        influxDB = DefaultConfig.getInfluxDB();
        redisWarnsKey = JedisUtil.getWarnKey();
    }

    @Override
    public void invoke(List<MultiDataEntity> element, Context context) throws Exception {
//        log.info("element: {}", element);
        // 顶法兰三个倾角仪的数据{pickTime，x1,y1,x2,y2,x3,y3}
        List<String> tf = Arrays.asList(new String[7]);
        // 底法兰三个倾角仪的数据{pickTime，x1,y1,x2,y2,x3,y3}
        List<String> bf = Arrays.asList(new String[7]);
        for (MultiDataEntity multiDataEntity : element) {
            String serialCode = multiDataEntity.getSerialCode();
            String pickTime = multiDataEntity.getPickTime();
            Double[] values = multiDataEntity.getValues();
            String v3 = String.valueOf(values[2]);
            String v4 = String.valueOf(values[3]);
            if ("1".equals(serialCode)) {
                tf.set(0, pickTime);
                tf.set(1, v3);
                tf.set(2, v4);
            } else if ("2".equals(serialCode)) {
                tf.set(3, v3);
                tf.set(4, v4);
            } else if ("3".equals(serialCode)) {
                tf.set(5, v3);
                tf.set(6, v4);
            } else if ("4".equals(serialCode)) {
                bf.set(0, pickTime);
                bf.set(1, v3);
                bf.set(2, v4);
            } else if ("5".equals(serialCode)) {
                bf.set(3, v3);
                bf.set(4, v4);
            } else if ("6".equals(serialCode)) {
                bf.set(5, v3);
                bf.set(6, v4);
            }
        }
//        log.info("bf: {}", bf);
        // 顶法兰传感器 id 为 74（此处为复合类型传感器）
        String tflSc = "74";
        // 底法兰传感器 id 为 75（此处为复合类型传感器）
        String bflSc = "75";
        // 顶法兰倾斜计算及告警判断
        FLQXEntity tfl = computeFLQX(jedis, tflSc, tf);
        // 底法兰倾斜计算及告警判断
        FLQXEntity bfl = computeFLQX(jedis, bflSc, bf);
        // 法兰倾斜数据信息推送 mqtt
        sendToMqtt(tfl);
        sendToMqtt(bfl);

        // 法兰数据存储
        saveToInfluxDB(tfl);
        saveToInfluxDB(bfl);
    }

    /**
     * 把法兰传感器信息写入 influxDB 数据库中
     */
    private void saveToInfluxDB(FLQXEntity flqxEntity) {
        if (flqxEntity != null) {
            Point point = getPoint(flqxEntity);
            if (null != point) {
                influxDB.write(point);
            }
        }
    }

    /**
     * 构建 influxDB 中的一条记录 point
     * 注：field 只存储该类型指标的 display 为 1 的指标
     */
    private Point getPoint(FLQXEntity multiDataEntity) {
        try {
            String projectId = multiDataEntity.getProjectId();
            String serialCode = multiDataEntity.getSerialCode();
            String targetType = multiDataEntity.getTypeTag().toLowerCase();
            String pickTime = multiDataEntity.getPickTime();
            Double[] values = multiDataEntity.getQuotaValues();
            long timestamp = DateUtil.parse(pickTime, Const.FMT_TRIM_MILLI).getTime();
            Point.Builder pointBuilder = Point
                    .measurement(Const.INFLUX_MEASUREMENT_BASE_MEAN + targetType)
                    // 时间一律采用毫秒级
                    .time(timestamp, TimeUnit.MILLISECONDS)
                    .tag(Const.PID, projectId)
                    .tag(Const.SC, serialCode);
            for (int i = 0; i < values.length; i++) {
                // 存储所有指标
                String fieldName = Const.INFLUX_MEASUREMENT_FIELD_BASE + (i + 1);
                // 数值一律采用 double 类型
                double fieldValue = values[i];
                // 添加 field
                pointBuilder.addField(fieldName, fieldValue);
            }
            return pointBuilder.build();
        } catch (Exception e) {
            log.error("get point failed");
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 数据推送
     */
    private void sendToMqtt(FLQXEntity flqxEntity) {
        try {
            if (flqxEntity == null) {
                return;
            }
            String projectId = flqxEntity.getProjectId();
            String serialCode = flqxEntity.getSerialCode();
            // topic
            String topic = StrUtil.stringsToStr(Const.SLASH, topic_data_fl, projectId, serialCode);
            // 数据
            String jsonString = JSON.toJSONString(flqxEntity);

            // 向 mqtt 推送数据
            MqttUtil.sendToMqtt(mqttClient, topic, jsonString);
//            log.info("##### send to mqtt success #####");
        } catch (Exception e) {
            log.error("errMsg: " + e.getMessage());
            log.info("##### send to mqtt failed【data】#####");
            e.printStackTrace();
        }
    }

    /**
     * @param serialCode 传感器的通讯编号
     * @return 返回存储秒级瞬时值的 redis key
     */
    private String getRedisKeyOfMean(String projectId, String serialCode) {
        // 构造 redis 中的 key，获取秒级瞬时值
        List<String> list = new ArrayList<>();
        list.add(Const.HK_SM);
        list.add(projectId);
        list.add(serialCode);
        return JedisUtil.formatSmKey(list);
    }


    /**
     * 计算法兰倾斜
     * 需要先判断倾角仪的xy绝对值是否都小于1°，否则为无效数据
     * 然后计算时选用xy绝对值之和最大的倾角仪
     *
     * @param fl 法兰的倾角仪 1、2、3 的x、y值
     * @return 法兰的倾斜角度和倾斜方向
     */
    private FLQXEntity computeFLQX(Jedis jedis, String serialCode, List<String> fl) {
        // 获取传感器的基本信息
        FLQXEntity targetInfo = getTargetInfo(jedis, serialCode);
        if (targetInfo == null || fl.contains(null)) {
            return null;
        }
        try {
            Double[] quotaValues = targetInfo.getQuotaValues();
            String pickTime = fl.get(0);
            double x1 = Double.parseDouble(fl.get(1));
            double y1 = Double.parseDouble(fl.get(2));
            double x2 = Double.parseDouble(fl.get(3));
            double y2 = Double.parseDouble(fl.get(4));
            double x3 = Double.parseDouble(fl.get(5));
            double y3 = Double.parseDouble(fl.get(6));
            // 使用xy绝对值之和最大的倾角仪数据进行法兰倾斜计算
            double x0 = 0;
            double y0 = 0;
            double sumOfAbs1 = NumUtil.getSumOfAbs(x1, y1);
            double sumOfAbs2 = NumUtil.getSumOfAbs(x2, y2);
            double sumOfAbs3 = NumUtil.getSumOfAbs(x3, y3);
            // 计算倾斜时，需要判断倾角仪的xy绝对值是否都小于1°，否则为无效数据
            if (eachAbsLessThan1(x1, y1) && sumOfAbs1 > sumOfAbs2 && sumOfAbs1 > sumOfAbs3) {
                x0 = x1;
                y0 = y1;
            }
            if (eachAbsLessThan1(x2, y2) && sumOfAbs2 > sumOfAbs1 && sumOfAbs2 > sumOfAbs3) {
                x0 = x2;
                y0 = y2;
            }
            if (eachAbsLessThan1(x2, y2) && sumOfAbs3 > sumOfAbs1 && sumOfAbs3 > sumOfAbs2) {
                x0 = x3;
                y0 = y3;
            }
            // 计算公式：返回倾斜角度和倾斜方位
            List<Double> result = compute(x0, y0);
            double x = result.get(0);
            double y = result.get(1);
            // 把计算结果以及计算数据都更新到返回的对象中
            quotaValues[0] = NumUtil.formatDouble(x);
            quotaValues[1] = NumUtil.formatDouble(y);
            quotaValues[2] = x1;
            quotaValues[3] = y1;
            quotaValues[4] = x2;
            quotaValues[5] = y2;
            quotaValues[6] = x3;
            quotaValues[7] = y3;
            // 更新数据时间和计算结果
            String pTime = CustomDateUtil.trimMilliToStdSec(pickTime);
            targetInfo.setPickTime(pTime);
            targetInfo.setQuotaValues(quotaValues);
        } catch (NumberFormatException e) {
            log.error("errMsg: {}", e.getMessage());
            e.printStackTrace();
        }

        // 判断是否告警
        return judgeWarn(targetInfo);
    }

    /**
     * 判断所传参数的绝对值是否都小于 1
     */
    private boolean eachAbsLessThan1(double x, double y) {
        return Math.abs(x) < 1 && Math.abs(y) < 1;
    }

    /**
     * 法兰倾斜计算公式
     */
    private List<Double> compute(double x, double y) {
        List<Double> res = new ArrayList<>();
        // 倾斜角度和倾斜方位
        double degree = 0, direction = 0;

        // 中间计算结果
        double v1 = Math.abs(x / (Math.abs(x) + Math.abs(y)) * Math.sqrt(2));
        double v2 = Math.sin(Math.toRadians(45));
        double v3 = Math.cos(Math.toRadians(45));
        double v4 = Math.atan(v1 * v2 / (1 - v1 * v3));
        double v5 = Math.abs(Math.tan(Math.toRadians(x)) / 1);
        double v6 = Math.toDegrees(Math.atan(((Math.abs(Math.tan(Math.toRadians(y)) / 1) - v5) / 90 * (90 -
                Math.toDegrees(v4)) + v5) / (1 / Math.sin(Math.toRadians(180 - 45 - Math.toDegrees(v4))) * v2)));

        // 第一象限
        if (x >= 0 && y >= 0) {
            if (y == 0) {
                degree = Math.abs(x);
                direction = 0;
            } else {
                degree = v6;
                direction = 90 - Math.toDegrees(v4);
            }
        }
        // 第二象限
        if (x < 0 && y > 0) {
            degree = v6;
            direction = 90 + Math.toDegrees(v4);
        }
        // 第三象限
        if (x < 0 && y <= 0) {
            if (y == 0) {
                degree = Math.abs(x);
                direction = 270 - 90;
            } else {
                degree = v6;
                direction = 270 - Math.toDegrees(v4);
            }
        }
        // 第四象限
        if (x >= 0 && y < 0) {
            degree = v6;
            direction = 270 + Math.toDegrees(v4);
        }
        res.add(degree);
        res.add(direction);
        return res;
    }

    /**
     * 根据法兰倾斜角度判断是否产生告警
     *
     * @param flqxEntity 法兰对象
     * @return 返回 FLQXEntity 对象，包含告警信息和对应的阈值范围信息
     */
    private FLQXEntity judgeWarn(FLQXEntity flqxEntity) {
        try {
            // 告警是根据第一个指标【倾斜角度】进行判断的
            flqxEntity.setWarnQuotaId("1");
            // 法兰倾斜角度
            Double qx = flqxEntity.getQuotaValues()[0];
            // 当前项目id，即提升阶段
            int pid = Integer.parseInt(flqxEntity.getProjectId());
            // 当前轮毂高度
            Double currentHeight = flqxEntity.getQuotaValues()[8];
            // 获取提升高度 【当前轮毂高度-基础高度】
            double tsHeight = currentHeight - Const.TS_BASE;
            // 获取阈值设置范围 {高度：项目：告警等级}
            Map<Integer, Map<Integer, Map<Integer, Double>>> flThreshold = CacheManager.getFlThreshold();
            for (Integer k : flThreshold.keySet()) {
                if (tsHeight < k && (k - tsHeight) <= 5) {
                    // 获取高度和提升阶段对应的阈值范围
                    Map<Integer, Double> threshold = flThreshold.get(k).get(pid);
                    flqxEntity.setThresholds(new ArrayList<>(threshold.values()));
                    // 三个等级的最大值
                    Double level1 = threshold.get(1);
                    Double level2 = threshold.get(2);
                    Double level3 = threshold.get(3);
                    // 判断是否告警，以及告警等级，从一级告警开始判断，无告警则等级设置为 0
                    if (qx > level1) {
                        flqxEntity.setWarnLevel("1");
                    } else if (qx > level2) {
                        flqxEntity.setWarnLevel("2");
                    } else if (qx > level3) {
                        flqxEntity.setWarnLevel("3");
                    } else {
                        flqxEntity.setWarnLevel("0");
                    }
                }
            }
//            log.info("=========flqxEntity: {}", flqxEntity);
            // 法兰告警信息推送 mqtt
            String entityWarnLevel = flqxEntity.getWarnLevel();
            if (entityWarnLevel != null && !"0".equals(entityWarnLevel)) {
                int warnLevel = Integer.parseInt(entityWarnLevel);
                PrimaryWarn primaryWarn = PrimaryWarn.builder()
                        .projectId(pid)
                        .targetId(Integer.parseInt(flqxEntity.getSerialCode()))
                        .targetTypeId(Integer.parseInt(flqxEntity.getTypeId()))
                        .quotaId(30)
                        .targetCode(flqxEntity.getTargetCode())
                        .quotaName(flqxEntity.getQuotaNames()[0])
                        .time(new Date())
                        .dataTime(flqxEntity.getPickTime())
                        .dataValue(flqxEntity.getQuotaValues()[0])
                        .rawData(flqxEntity.getQuotaValues())
                        .level(warnLevel)
                        .currentHeight(currentHeight)
                        .thresholdName(getThresholdJson(flqxEntity))
                        .message(getMessage(warnLevel))
                        .build();

                // 处理告警信息
                warnDealer(primaryWarn);
            }
        } catch (NumberFormatException e) {
            log.error("errMsg: {}", e.getMessage());
            e.printStackTrace();
        }
//        log.info("【flqxEntity】： {}", flqxEntity);
        return flqxEntity;
    }

    private String getMessage(int warnLevel) {
        String msg = null;
        if (warnLevel == 1) {
            msg = "一级告警";
        } else if (warnLevel == 2) {
            msg = "二级告警";
        } else if (warnLevel == 3) {
            msg = "三级告警";
        }
        return msg;
    }

    /**
     * @return 返回 BaseThreshold 的 Json 对象
     */
    private String getThresholdJson(FLQXEntity flqxEntity) {
        int warnLevel = Integer.parseInt(flqxEntity.getWarnLevel());
        List<Double> thresholds = flqxEntity.getThresholds();
        Double quotaValue = flqxEntity.getQuotaValues()[0];
        String warnName = null;
        Double lower = null, upper = null;
        if (warnLevel == 1) {
            warnName = "一级告警";
            lower = thresholds.get(warnLevel - 1);
            upper = 999d;
        } else if (warnLevel == 2) {
            warnName = "二级告警";
            lower = thresholds.get(warnLevel - 1);
            upper = thresholds.get(warnLevel - 2);
        } else if (warnLevel == 3) {
            warnName = "三级告警";
            lower = thresholds.get(warnLevel - 1);
            upper = thresholds.get(warnLevel - 2);
        }
        BaseThreshold baseThreshold = BaseThreshold.builder()
                .targetId(Integer.parseInt(flqxEntity.getSerialCode()))
                .quotaId(15)
                .quotaCode(1)
                .quotaName(flqxEntity.getQuotaNames()[0])
                .quotaUnit(flqxEntity.getQuotaUnits()[0])
                .dataValue(quotaValue)
                .name(warnName)
                .level(warnLevel)
                .lower(lower)
                .upper(upper)
                .absolute(false)
                .build();
        return JSON.toJSONString(baseThreshold);
    }

    /**
     * 告警处理
     *
     * @param primaryWarn 告警对象
     */
    private void warnDealer(PrimaryWarn primaryWarn) {
        try {
            // 设置告警时当前的提升高度
            double currentHeight = Double.parseDouble(jedis.get(Const.TS_REDIS));
            primaryWarn.setCurrentHeight(currentHeight);
            // topic
            String projectId = String.valueOf(primaryWarn.getProjectId());
            String sc = String.valueOf(primaryWarn.getTargetId());
            String topic = StrUtil.stringsToStr(Const.SLASH, topic_warn, projectId, sc);
            // json 对象
            String jsonString = JSON.toJSONString(primaryWarn);

            // 每隔300秒处理一次告警【测试暂定】
            long thisTime = System.currentTimeMillis();
            long diffTime = (thisTime - lastTimeMqtt) / 1000;
            if (diffTime > 30) {
                // 推送告警信息
                log.info("primaryWarn: {}", jsonString);
                publish(topic, jsonString);
                lastTimeMqtt = System.currentTimeMillis();
            }

            // 告警信息同时存储到 redis 缓存中
            jedis.lpush(redisWarnsKey, jsonString);
        } catch (NumberFormatException e) {
            log.error("errMsg: {}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 向 mqtt 指定主题发布消息
     *
     * @param topic 发布的主题名称
     * @param msg   发布的消息
     */
    private void publish(String topic, String msg) {
        try {
            // 把从 redis 缓存中获取的数据推送到 mqtt
            MqttUtil.sendToMqtt(mqttClient, topic, msg);
//            log.info("##### send 【{}】 to mqtt success #####", topic);
        } catch (Exception e) {
            log.info("##### send 【{}】 to mqtt failed #####", topic);
            log.error("errMsg: {}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 获取 redis 缓存中对应的传感器信息
     * 【key -> hk:bi:tgt:projectId_serialCode】，
     */
    private FLQXEntity getTargetInfo(Jedis jedis, String serialCode) {
        try {
            // 当前项目id，即提升阶段
            String projectId = jedis.get(Const.PID);
            // 获取当前轮毂高度
            double ts = Double.parseDouble(jedis.get(Const.TS_REDIS));
            // 获取 redis 缓存中对应的传感器信息
            String tgtKey = "hk:bi:tgt:" + projectId + Const.BAR_BOTTOM + serialCode;
            String s = jedis.get(tgtKey);
            if (s == null) {
                log.error("获取传感器基础信息失败！redisKey 为：{}", tgtKey);
                return null;
            }
            LiftDataEntity liftDataEntity = (LiftDataEntity) SerializeUtil.deserialize(s.getBytes());
            // 更新当前高度
            Double[] quotaValues = new Double[liftDataEntity.getQuotaNames().length];
            quotaValues[8] = NumUtil.formatDouble(ts);
            return FLQXEntity.builder()
                    .projectId(liftDataEntity.getProjectId())
                    .serialCode(liftDataEntity.getSerialCode())
                    .targetCode(liftDataEntity.getTargetCode())
                    .targetCodeAbbr(liftDataEntity.getTargetCodeAbbr())
                    .typeId(liftDataEntity.getTypeId())
                    .quotaNames(liftDataEntity.getQuotaNames())
                    .quotaUnits(liftDataEntity.getQuotaUnits())
                    .quotaValues(quotaValues)
                    .site(liftDataEntity.getSite())
                    .typeName(liftDataEntity.getTypeName())
                    .typeTag(liftDataEntity.getTypeTag())
                    .build();
        } catch (NumberFormatException e) {
            log.error("errMsg: {}", e.getMessage());
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void close() throws Exception {
        if (jedis != null) {
            jedis.close();
        }
        if (influxDB != null) {
            influxDB.close();
        }
        if (mqttClient != null) {
            mqttClient.disconnect();
            mqttClient.close();
        }
    }

}
